package com.chenzhiling.study.hadoop

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io.IOUtils

import java.io.IOException
import java.net.URI

/**
 * @Author: CHEN ZHI LING
 * @Date: 2021/8/5
 * @Description: scala hadoop Api
 */
object HadoopApi {


  val hdfsPath = "hdfs://MasterChen:9000"
  var fileSystem: FileSystem = _

  /**
   * 获得hadoop文件系统
   * @return
   */
  def getFileSystem:FileSystem={
    try { //创建URI对象
      val uri = new URI(hdfsPath)
      //获取文件系统
      val configuration = new Configuration
      fileSystem = FileSystem.get(uri,configuration)
      fileSystem
    } catch {
      case e: Exception =>
        e.printStackTrace()
        fileSystem
    }
  }


  /**
   * 创建文件夹
   * @param fileSystem fs
   * @param path 路径
   */
  def mkDir(fileSystem: FileSystem, path: String): Unit = {
    try {
      val srcPath = new Path(path)
      fileSystem.mkdirs(srcPath)
    } catch {
      case e: IOException =>
        e.printStackTrace()
    } finally {
      fileSystem.close()
    }
  }


  /**
   * 上传文件到集群
   * @param fileSystem 文件系统
   * @param src 源路径
   * @param dst 目标路径
   */
  def uploadFile(fileSystem: FileSystem, src: String, dst: String): Unit = {
    try {
      //创建原路径Path对象
      val srcPath = new Path(src)
      //创建目标路径Path对象
      val dstPath = new Path(dst)
      //调用文件系统的文件复制函数，前面的参数是指是否删除源文件，true为删除，否则不删除
      fileSystem.copyFromLocalFile(srcPath, dstPath)
    } catch {
      case e: IOException =>
        e.printStackTrace()
    } finally {
      //关闭文件系统
      fileSystem.close()
    }
  }


  /**
   * 从集群下载文件到本地
   * @param fileSystem hadoop集群
   * @param localFileSystem 本地文件系统
   * @param src 源路径
   * @param dst 目标路径
   */
  def downloadFile(fileSystem: FileSystem,localFileSystem: LocalFileSystem, src: String, dst: String): Unit = {
    var in:FSDataInputStream = null
    var out:FSDataOutputStream = null
    try {
      val srcPath = new Path(src)
      val dstPath = new Path(dst)
      //通过path构建文件系统的输入流
      in= fileSystem.open(srcPath)
      //构建输出流
      out = localFileSystem.create(dstPath)
      //构建输出流
      //下载
      IOUtils.copyBytes(in, out, 1024, true)
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }finally {
      in.close()
      out.close()
      fileSystem.close()
      localFileSystem.close()
    }
  }


  /**
   * 删除文件
   * @param fileSystem 文件系统
   * @param path 路径
   */
  def deleteFile(fileSystem: FileSystem, path: String): Unit = {
    val p = new Path(path)
    try {
      fileSystem.delete(p, true)
    }catch {
      case e:IOException =>
        e.printStackTrace()
    }finally {
      fileSystem.close()
    }
  }
}
